{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Load the airport and flight data from Cloudant"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cloudantHost='dtaieb.cloudant.com'\n",
    "cloudantUserName='weenesserliffircedinvers'\n",
    "cloudantPassword='72a5c4f939a9e2578698029d2bb041d775d088b5'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "airports = sqlContext.read.format(\"com.cloudant.spark\").option(\"cloudant.host\",cloudantHost)\\\n",
    "    .option(\"cloudant.username\",cloudantUserName).option(\"cloudant.password\",cloudantPassword)\\\n",
    "    .option(\"schemaSampleSize\", \"-1\").load(\"flight-metadata\")\n",
    "airports.cache()\n",
    "airports.registerTempTable(\"airports\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    }
   },
   "outputs": [],
   "source": [
    "import pixiedust\n",
    "\n",
    "# Display the airports data\n",
    "display(airports)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "flights = sqlContext.read.format(\"com.cloudant.spark\").option(\"cloudant.host\",cloudantHost)\\\n",
    "    .option(\"cloudant.username\",cloudantUserName).option(\"cloudant.password\",cloudantPassword)\\\n",
    "    .option(\"schemaSampleSize\", \"-1\").load(\"pycon_flightpredict_training_set\")\n",
    "flights.cache()\n",
    "flights.registerTempTable(\"training\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    }
   },
   "outputs": [],
   "source": [
    "# Display the flights data\n",
    "display(flights)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Build the vertices and edges dataframe from the data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import functions as f\n",
    "from pyspark.sql.types import *\n",
    "\n",
    "rdd = flights.rdd.flatMap(lambda s: [s.arrivalAirportFsCode, s.departureAirportFsCode]).distinct()\\\n",
    "    .map(lambda row:[row])\n",
    "vertices = airports.join(\n",
    "      sqlContext.createDataFrame(rdd, StructType([StructField(\"fs\",StringType())])), \"fs\"\n",
    "    ).dropDuplicates([\"fs\"]).withColumnRenamed(\"fs\",\"id\")\n",
    "\n",
    "print(vertices.count())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "edges = flights.withColumnRenamed(\"arrivalAirportFsCode\",\"dst\")\\\n",
    "    .withColumnRenamed(\"departureAirportFsCode\",\"src\")\\\n",
    "    .drop(\"departureWeather\").drop(\"arrivalWeather\").drop(\"pt_type\").drop(\"_id\").drop(\"_rev\")\n",
    "\n",
    "print(edges.count())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Install GraphFrames package using PixieDust packageManager  \n",
    "\n",
    "The [GraphFrames package](https://mvnrepository.com/artifact/graphframes/graphframes) to install depends on the environment.\n",
    "\n",
    "**Spark 1.6**\n",
    "\n",
    "- `graphframes:graphframes:0.5.0-spark1.6-s_2.11`\n",
    "\n",
    "**Spark 2.x**\n",
    "\n",
    "- `graphframes:graphframes:0.5.0-spark2.1-s_2.11`\n",
    "\n",
    "In addition, recent versions of graphframes have dependencies on other packages which will need to also be installed:\n",
    "\n",
    "- `com.typesafe.scala-logging:scala-logging-api_2.11:2.1.2`\n",
    "- `com.typesafe.scala-logging:scala-logging-slf4j_2.11:2.1.2`\n",
    "\n",
    "> Note: After installing packages, the kernel will need to be restarted and all the previous cells re-run (including the install package cell).\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import pixiedust\n",
    "\n",
    "if sc.version.startswith('1.6.'):  # Spark 1.6\n",
    "    pixiedust.installPackage(\"graphframes:graphframes:0.5.0-spark1.6-s_2.11\")\n",
    "elif sc.version.startswith('2.'):  # Spark 2.1, 2.0\n",
    "    pixiedust.installPackage(\"graphframes:graphframes:0.5.0-spark2.1-s_2.11\")\n",
    "\n",
    "\n",
    "pixiedust.installPackage(\"com.typesafe.scala-logging:scala-logging-api_2.11:2.1.2\")\n",
    "pixiedust.installPackage(\"com.typesafe.scala-logging:scala-logging-slf4j_2.11:2.1.2\")\n",
    "\n",
    "print(\"done\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create the GraphFrame from the Vertices and Edges Dataframes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "graphMap"
     }
    },
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from graphframes import GraphFrame\n",
    "g = GraphFrame(vertices, edges)\n",
    "display(g)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Compute the degree for each vertex in the graph\n",
    "The degree of a vertex is the number of edges incident to the vertex. In a directed graph, in-degree is the number of edges where vertex is the destination and out-degree is the number of edges where the vertex is the source. With GraphFrames, there is a degrees, outDegrees and inDegrees property that return a DataFrame containing the id of the vertext and the number of edges. We then sort then in descending order"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    },
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import *\n",
    "degrees = g.degrees.sort(desc(\"degree\"))\n",
    "display( degrees )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Compute a list of shortest paths for each vertex to a specified list of landmarks\n",
    "For this we use the `shortestPaths` api that returns DataFrame containing the properties for each vertex plus an extra column called distances that contains the number of hops to each landmark.\n",
    "In the following code, we use BOS and LAX as the landmarks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    },
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "r = g.shortestPaths(landmarks=[\"BOS\", \"LAX\"]).select(\"id\", \"distances\")\n",
    "display(r)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Compute the pageRank for each vertex in the graph\n",
    "[PageRank](https://en.wikipedia.org/wiki/PageRank) is a famous algorithm used by Google Search to rank vertices in a graph by order of importance. To compute pageRank, we'll use the `pageRank` api that returns a new graph in which the vertices have a new `pagerank` column representing the pagerank score for the vertex and the edges have a new `weight` column representing the edge weight that contributed to the pageRank score. We'll then display the vertice ids and associated pageranks sorted descending: "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "vertices"
     }
    },
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import *\n",
    "\n",
    "ranks = g.pageRank(resetProbability=0.20, maxIter=5)\n",
    "\n",
    "rankedVertices = ranks.vertices.select(\"id\",\"pagerank\").orderBy(desc(\"pagerank\"))\n",
    "rankedEdges = ranks.edges.select(\"src\", \"dst\", \"weight\").orderBy(desc(\"weight\") )\n",
    "\n",
    "ranks = GraphFrame(rankedVertices, rankedEdges)\n",
    "display(ranks)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Search routes between 2 airports with specific criteria\n",
    "In this section, we want to find all the routes between Boston and San Francisco operated by United Airlines with at most 2 hops. To accomplish this, we use the `bfs` ([Breath First Search](https://en.wikipedia.org/wiki/Breadth-first_search)) api that returns a DataFrame containing the shortest path between matching vertices. For clarity will only keep the edge when displaying the results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    },
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "paths = g.bfs(fromExpr=\"id='BOS'\",toExpr=\"id = 'SFO'\",edgeFilter=\"carrierFsCode='UA'\", maxPathLength = 2)\\\n",
    "    .drop(\"from\").drop(\"to\")\n",
    "paths.cache()\n",
    "display(paths)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Find all airports that do not have direct flights between each other\n",
    "In this section, we'll use a very powerful graphFrames search feature that uses a pattern called [motif](http://graphframes.github.io/user-guide.html#motif-finding) to find nodes. The pattern we'll use the following pattern `\"(a)-[]->(b);(b)-[]->(c);!(a)-[]->(c)\"` which searches for all nodes a, b and c that have a path to (a,b) and a path to (b,c) but not a path to (a,c). \n",
    "Also, because the search is computationally expensive, we reduce the number of edges by grouping the flights that have the same src and dst."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    },
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import *\n",
    "\n",
    "h = GraphFrame(g.vertices, g.edges.select(\"src\",\"dst\")\\\n",
    "   .groupBy(\"src\",\"dst\").agg(count(\"src\").alias(\"count\")))\n",
    "\n",
    "query = h.find(\"(a)-[]->(b);(b)-[]->(c);!(a)-[]->(c)\").drop(\"b\")\n",
    "query.cache()\n",
    "display(query)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Compute the strongly connected components for this graph\n",
    "[Strongly Connected Components](https://en.wikipedia.org/wiki/Strongly_connected_component) are components for which each vertex is reachable from every other vertex. To compute them, we'll use the `stronglyConnectedComponents` api that returns a DataFrame containing all the vertices with the addition of a `component` column that has the component id in which the vertex belongs to. We then group all the rows by components and aggregate the sum of all the member vertices. This gives us a good idea of the components distribution in the graph"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    },
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import *\n",
    "components = g.stronglyConnectedComponents(maxIter=10).select(\"id\",\"component\")\\\n",
    "    .groupBy(\"component\").agg(count(\"id\").alias(\"count\")).orderBy(desc(\"count\"))\n",
    "display(components)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Detect communities in the graph using Label Propagation algorithm\n",
    "[Label Propagation algorithm](https://en.wikipedia.org/wiki/Label_Propagation_Algorithm) is a popular algorithm for finding communities within a graph. It has the advantage to be computationally inexpensive and thus works well with large graphs. To compute the communities, we'll use the `labelPropagation` api that returns a DataFrame containing all the vertices with the addition of a `label` column that has the label id for the communities in which the vertex belongs to. Similar to the strongly connected components, we'll then group all the rows by label and aggregate the sum of all the member vertices."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "handlerId": "dataframe"
     }
    }
   },
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import *\n",
    "communities = g.labelPropagation(maxIter=5).select(\"id\", \"label\")\\\n",
    "    .groupBy(\"label\").agg(count(\"id\").alias(\"count\")).orderBy(desc(\"count\"))\n",
    "display(communities)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Use AggregateMessages to compute the average flight delays by originating airport\n",
    "\n",
    "AggregateMessages api is not currently available in Python, so we use PixieDust Scala bridge to call out the Scala API\n",
    "Note: Notice that PixieDust is automatically rebinding the python GraphFrame variable g into a scala GraphFrame with same name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%%scala\n",
    "import org.graphframes.lib.AggregateMessages\n",
    "import org.apache.spark.sql.functions.{avg,desc,floor}\n",
    "\n",
    "// For each airport, average the delays of the departing flights\n",
    "val msgToSrc = AggregateMessages.edge(\"deltaDeparture\")\n",
    "val __agg = g.aggregateMessages\n",
    "  .sendToSrc(msgToSrc)  // send each flight delay to source\n",
    "  .agg(floor(avg(AggregateMessages.msg)).as(\"averageDelays\"))  // average up all delays\n",
    "  .orderBy(desc(\"averageDelays\"))\n",
    "  .limit(10)\n",
    "__agg.cache()\n",
    "__agg.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "pixiedust": {
     "displayParams": {
      "aggregation": "SUM",
      "handlerId": "barChart",
      "keyFields": "id",
      "showLegend": "true",
      "stacked": "true",
      "staticFigure": "false",
      "title": "Average Flight delays by originating airport",
      "valueFields": "averageDelays"
     }
    }
   },
   "outputs": [],
   "source": [
    "display(__agg)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "pySpark (Spark 1.6.0) Python 2",
   "language": "python",
   "name": "pyspark1.6"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
